1. Overview
之前提到过, ThreadPoolExecutor
的内部类Worker
维护了一个非常基本的锁结构. 方法不多, 可以由名知义:
- lock
- tryLock
- unlock
- isLocked
Worker
本身继承了AbstractQueuedSynchronizer
(AQS)来实现上述功能的. 事实上, AQS是Java中数个并发工具的基础, 功能也比较复杂. 我们先从这些并发工具中最简单的Worker
开始, 之后逐步递进:
- ThreadPoolExecutor#Worker
- CountDownLatch
- Semaphore
- FutureTask (JDK 1.6)
- ReentrantLock
- ConditionObject
- ReadWriteReentrantLock
- AQS总结
2. 一个最简单的独占锁
我们可以基于CAS实现一种锁. 代码如下:
1 | import java.util.concurrent.atomic.AtomicBoolean; |
- 类比我们的布尔变量
lockState
, AQS中使用的是一个整形字段 -state
. 在互斥(即同时只能由一个线程持有锁)的场景下,state
可能的取值只有0 / 1两种, 分别对应false
,true
. tryLock
只是简单的尝试获取锁, 而在实际的lock
方法中, 等待获取锁的线程都是被阻塞住的. 而这些线程的引用, 被维护于一个队列结构中.
3. A Thread-Safe Queue
AQS中另一个主体是一个基于双向链表的队列结构. 不考虑AQS逻辑, 实现如下:
1 | /** |
初始状态下, head
, tail
指向同一个节点. 如果没有出队列操作, 只是修改tail
引用的节点. 观察enqueue
方法, 思路和Atomic
工具中的getAndSet
思路是一致的. 示例中的类和变量在AQS中都可以找到对应.
4. Worker
AQS是一个抽象类. 其他工具都是以继承的方式覆盖AQS的几个钩子方法.
4.1 tryLock
Worker#tryLock
和我们的CASLock
的基本思想是一致的, 即通过一个原子变量来表示当前锁的状态, 只不过CASLock
中使用的是true/false
, 而Worker
中使用的是1/0
, 维持这个状态的是继承于AQS的state
字段.
从下面的流程图可以看出来整体逻辑非常简单. 从这里开始, 稍微值得关注是一个问题是哪些方法是Worker
重载的, 而哪些是继承的.
当成功拿到锁之后, 会将exclusiveOwnerThread
设为当前线程, 根据名称可以看出来, 仅当独占时才会用到此变量.
4.2 isLocked
同样是基于state
的简单逻辑: 即AQS#getState() == 0
.
4.3 lock
lock
的方法比较复杂. 如图:
- 进入方法时, 首先进行了一次
tryAcquire
, 如果成功那真是最好的结局. 否则开始”等待”. - 等待指的就是当前线程被抽象为一个节点, 进入了队列尾端(See
addWaiter
) . 就开始”等待”, 随即进入AQS#acquireQueued
逻辑. acquireQueued
内, 线程进入了一个循环体系中.- 仅当满足 当前节点前面节点为头结点 并且 tryAcquire成功 时才能跳出循环. 此时为成功获取锁.
- 不满足上述条件线程, 会被挂起(park). 如果一个线程此时无法获取锁, 注意它的前置节点
waitStatus
初始为0,shouldParkAfterFailedAcquire
方法中会将其置为Node.SIGNAL
. 而在下次循环中, 此线程才被真正挂起.
看到这里, 如果对于下面几个问题有疑问, 先保留.
waitStatus
是干什么的?- 线程在
parkAndCheckInterrupt
中, 调用的是Thread.interrupted()
. 如我们所知, 这个方法会清楚并返回线程的中断状态. 而acquireQueued
中如果曾经被中断过, 就会把中断状态返回给acquire
,acquire
中调用selfInterrupt
, 重新将当前线程中断状态设置为`true - 何时执行到
acquireQueued
内部final
块调用的cancelAcquire
?. 在acquireQueue
中的循环, 只有一个出口即满足p == head && tryAcquire(arg)
. 而此时fail
将一定为false
, 所以看似一定不会cancelAcquire
Worker
看似是一个很典型的锁. 但是注意的是它的构造方法有这么一句1
setState(-1); // inhibit interrupts until runWorker
初始情况state == -1
, 所以compareAndSet(0, 1)
是永远无法成功的, 后面对此进行说明. 这里先假设, 初始状态state == 0
. 基于一个简单的示例说明数据变化
1 | Worker worker = new Worker(); |
这个过程的数据变化如下图(Step 3可以留到下节后再回来看):
4.4 unlock
假如线程A获取了锁, 线程B在等待. 当前专题下
Worker
的tryRelease
方法是一定会成功的.
// TODO 流程图 + 实例
回顾
虽然前文曾经表示可以暂时将Worker
看作一个ReentrantLock
, 然后实际上两者行为还是有一些不一致的. 先来看下, ThreadPoolExecutor
都有哪里调用了Worker
的锁行为.
- 构造方法设置
state
为-1. runWorker
时先unlock
- 在循环中, 每个task执行前后
lock
,unlock
interruptIdleWorkers
前后tryLock
,unlock
第1, 2点注释也有说明, 1是为了禁止被interrupt
, 2是允许interrupt
. 而且由于第一点的存在, 如果当做普通锁, 上来就lock()
的话, 是会一直阻塞的, 所以前面把初始状态设置为0.
在大部分情况下, Woker这个同步工具并没有涉及到多线程间通信, 它只被线程池中的线程持有. 而当主线程试图interrupt
线程池中线程时, 才有多个线程对锁的争用. ThreadPoolExecutor
中
shutdown
中会调用interruptIdleWorkers
(tryLock
成功再interrupt
)shutdownNow
调用interruptWorkers
(对所有线程都interrupt
).
这两个方法中一个会判断getState() >= 0
, 一个会tryLock()
. 所以当getState() == -1
时两个操作都无效.
另外, Worker
的tryRelease
方法很粗暴. 如果有ReentrantLock
使用经验会发现, 如果某线程不是锁的持有者但又试图unlock
, 会抛出IllegalMonitorStateException
. 而Worker
则不然, A线程lock
后B线程也可以unlock
.
单独提取Worker
中部分同步相关代码, 可以注意下那些方法是Override
的, 以及暴露的方法和AQS的对应关系:
1 | @SuppressWarnings("serial") |